package com.zrkizzy.common.mq.config;

import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.StopWatch;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.zrkizzy.common.core.enums.CommonErrorCode;
import com.zrkizzy.common.mq.RabbitListenerFactory;
import com.zrkizzy.common.mq.RabbitRetryListener;
import com.zrkizzy.common.mq.config.properties.RabbitModuleProperties;
import com.zrkizzy.common.mq.config.properties.RabbitProperties;
import com.zrkizzy.common.mq.constant.RabbitMqConst;
import com.zrkizzy.common.mq.enums.RabbitEnum;
import com.zrkizzy.common.mq.enums.RabbitExchangeTypeEnum;
import com.zrkizzy.common.mq.exception.RabbitMqException;
import com.zrkizzy.common.mq.service.impl.AbstractProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.CollectionUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
 * RabbitMQ配置类
 *
 * @author zhangrongkang
 * @since 2023/10/24
 */
@Slf4j
@Configuration
public class RabbitMqConfig implements SmartInitializingSingleton {

    /**
     * RabbitMQ连接工厂
     */
    private final ConnectionFactory connectionFactory;

    /**
     * RabbitMQ操作管理器
     */
    private final AmqpAdmin amqpAdmin;

    /**
     * YML文件中的配置内容
     */
    private final RabbitModuleProperties rabbitModuleProperties;

    @Autowired
    public RabbitMqConfig(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties, ConnectionFactory connectionFactory) {
        this.amqpAdmin = amqpAdmin;
        this.rabbitModuleProperties = rabbitModuleProperties;
        this.connectionFactory = connectionFactory;
    }

    /**
     *
     */
    @Override
    public void afterSingletonsInstantiated() {
        StopWatch stopWatch = StopWatch.create("RabbitMQ");
        stopWatch.start();
//        log.info("初始化配置RabbitMQ...");
        // 获取到YML中的配置
        List<RabbitProperties> moduleProperties = rabbitModuleProperties.getModules();
        // 校验当前配置文件中的RabbitMQ属性是否为空
        if (CollectionUtils.isEmpty(moduleProperties)) {
            log.warn("项目未配置RabbitMQ");
            // 直接返回
            return;
        }
        for (RabbitProperties module : moduleProperties) {
            try {
                // 创建队列
                Queue queue = createQueue(module);
                // 创建交换机
                Exchange exchange = createExchange(module);
                // 绑定队列和交换机
                bindQueueAndExchange(queue, exchange, module);
                // 绑定生产者
                bindProducer(module);
                // 绑定消费者
                bindConsumer(queue, exchange, module);
//                log.info("================================ 完成队列初始化：{} ================================", queue.getName());
            } catch (Exception e) {
                log.error("RabbitMQ初始化失败");
            }
        }
        stopWatch.stop();
        log.info("初始化RabbitMQ配置成功耗时: {}ms", stopWatch.getTotalTimeMillis());
    }

    /**
     * 绑定消费者
     *
     * @param queue 队列
     * @param exchange 交换机
     * @param module RabbitMQ属性配置类
     */
    private void bindConsumer(Queue queue, Exchange exchange, RabbitProperties module) {
//        log.info("开始绑定队列 [{}] 消费者...", queue.getName());
        // 定义RabbitMQ重试监听器
        RabbitRetryListener rabbitRetryListener = null;
        try {
            // 如果当前队列指定了重试逻辑
            if (StrUtil.isNotBlank(module.getRetry())) {
                // 获取具体实现
                rabbitRetryListener = SpringUtil.getBean(module.getRetry());
            } else {
                log.warn("队列 [{}] 未指定重试类，如果需要重试逻辑则需要具体实现", queue.getName());
            }
        } catch (Exception e) {
            log.warn("Spring容器中未找到重试类 [{}]，若需要重试则需要做具体实现", module.getRetry());
        }
        try {
            // 通过连接工厂绑定消费者
            RabbitListenerFactory rabbitListenerFactory = RabbitListenerFactory.builder()
                    // 连接工厂
                    .connectionFactory(connectionFactory)
                    // 队列
                    .queue(queue)
                    // 交换机
                    .exchange(exchange)
                    // 消费者
                    .consumer(SpringUtil.getBean(module.getConsumer()))
                    // 重试回调
                    .rabbitRetryListener(rabbitRetryListener)
                    // 自动确认
                    .autoAck(module.getAutoAck())
                    // RabbitMQ操作管理器
                    .amqpAdmin(amqpAdmin).build();
            // 通过工厂获取对应容器
            SimpleMessageListenerContainer container = rabbitListenerFactory.getObject();
            // 判断容器是否为空
            if (Objects.nonNull(container)) {
                // 启动容器
                container.start();
//                log.info("队列 [{}] 消费者绑定成功: {}", module.getQueue().getName(), module.getConsumer());
            }
        } catch (Exception e) {
            log.warn("Spring容器中未找到消费者 [{}]，若需要此消费者则需要做具体实现", module.getConsumer());
        }
    }

    /**
     * 绑定生产者
     *
     * @param module RabbitMQ属性配置类
     */
    private void bindProducer(RabbitProperties module) {
        try {
//            log.info("开始绑定队列 [{}] 生产者...", module.getQueue().getName());
            // 获取生产者
            AbstractProducer rabbitProducer = SpringUtil.getBean(module.getProducer());
            // 设置交换机名称
            rabbitProducer.setExchange(module.getExchange().getName());
            // 设置路由
            rabbitProducer.setRoutingKey(module.getRoutingKey());
//            log.info("队列 [{}] 生产者绑定成功: {}", module.getQueue().getName(), module.getProducer());
        } catch (Exception e) {
            log.warn("Spring容器中未找到生产者 [{}]，若需要此生产者则需要做具体实现", module.getProducer());
        }
    }

    /**
     * 绑定队列和交换机
     *
     * @param queue 队列
     * @param exchange 交换机
     * @param module RabbitMQ属性配置类
     */
    private void bindQueueAndExchange(Queue queue, Exchange exchange, RabbitProperties module) {
        // 获取交换机名称
        String exchangeName = module.getExchange().getName();
//        log.info("开始绑定队列 [{}] 与交换机 [{}]...", queue.getName(), exchangeName);
        // 获取队列名称
        String queueName = module.getQueue().getName();
        // 格式化队列和交换机路由
        module.setRoutingKey(StrUtil.format(RabbitEnum.ROUTER_KEY.getValue(), module.getRoutingKey()));
        // 获取路由
        String routingKey = module.getRoutingKey();
        // 绑定交换机与路由
        Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
        // 声明队列、交换机、绑定对象到RabbitMQ操作管理器中
        amqpAdmin.declareQueue(queue);
        amqpAdmin.declareExchange(exchange);
        amqpAdmin.declareBinding(binding);
//        log.info("队列 [{}] 与 交换机 [{}] 绑定成功", queueName, exchangeName);
    }

    /**
     * 创建交换机
     *
     * @param module RabbitMQ属性配置类
     * @return 交换机
     */
    private Exchange createExchange(RabbitProperties module) {
        // 获取YML文件中的交换机
        RabbitProperties.Exchange exchange = module.getExchange();
        // 获取交换机类型
        RabbitExchangeTypeEnum exchangeType = exchange.getType();
        // 格式化交换机名称
        exchange.setName(StrUtil.format(RabbitEnum.EXCHANGE.getValue(), exchange.getName()));
        // 获取交换机参数
        String exchangeName = exchange.getName();
        Boolean isDurable = exchange.getDurable();
        Boolean isAutoDelete = exchange.getAutoDelete();
        Map<String, Object> arguments = exchange.getArguments();
        // 构建交换机并返回
        return getExchangeByType(exchangeType, exchangeName, isDurable, isAutoDelete, arguments);
    }

    /**
     * 通过交换机类型构建交换机
     *
     * @param exchangeType 交换机类型
     * @param exchangeName 交换机名称（格式化）
     * @param isDurable 是否持久化
     * @param isAutoDelete 是否自动删除
     * @param arguments 交换机其他参数
     * @return 交换机
     */
    private Exchange getExchangeByType(RabbitExchangeTypeEnum exchangeType, String exchangeName, Boolean isDurable, Boolean isAutoDelete, Map<String, Object> arguments) {
        return switch (exchangeType) {
            // 直连交换机
            case DIRECT -> new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);
            // 主题交换机
            case TOPIC -> new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);
            // 扇形交换机
            case FANOUT -> new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);
            // 头部交换机
            case HEADERS -> new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);
            // 抛出系统配置错误异常
            default -> throw new RabbitMqException(CommonErrorCode.MESSAGE_SERVICE_ERROR);
        };
    }

    /**
     * 创建队列
     *
     * @param module RabbitMQ属性配置类
     * @return 队列
     */
    private Queue createQueue(RabbitProperties module) {
        RabbitProperties.Queue queue = module.getQueue();
        // 格式化队列名称
        queue.setName(StrUtil.format(RabbitEnum.QUEUE.getValue(), queue.getName()));
//        log.info("================================ 开始初始化队列：{} ================================", queue.getName());
        // 获取队列其他参数
        Map<String, Object> arguments = queue.getArguments();
        // 如果其他参数为空
        if (MapUtil.isEmpty(arguments)) {
            // 初始化Map保证不出错
            arguments = new HashMap<>();
        }

        // 将ttl类型数据转为Long类型
        if (arguments.containsKey(RabbitMqConst.X_MESSAGE_TTL)) {
            arguments.put(RabbitMqConst.X_MESSAGE_TTL, Convert.toLong(arguments.get(RabbitMqConst.X_MESSAGE_TTL)));
        }
        // 获取死信队列参数
        String deadLetterExchange = queue.getDeadLetterExchange();
        String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
        // 校验当前队列是否有死信队列
        if (StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) {
            // 格式化死信队列名称
            deadLetterExchange = StrUtil.format(RabbitEnum.EXCHANGE.getValue(), deadLetterExchange);
            // 格式化死信队列路由
            deadLetterRoutingKey = StrUtil.format(RabbitEnum.ROUTER_KEY.getValue(), deadLetterRoutingKey);
            // 绑定死信队列参数
            arguments.put(RabbitMqConst.X_DEAD_LETTER_EXCHANGE, deadLetterExchange);
            arguments.put(RabbitMqConst.X_DEAD_LETTER_ROUTING_KEY, deadLetterRoutingKey);

//            log.info("死信队列参数: 交换机: {}, 路由: {}", deadLetterExchange, deadLetterRoutingKey);
        }

        return new Queue(queue.getName(), queue.getDurable(), queue.getExclusive(), queue.getAutoDelete(), arguments);
    }
}
